sketch out sync codecs + threadpool#3715
sketch out sync codecs + threadpool#3715d-v-b wants to merge 31 commits intozarr-developers:mainfrom
Conversation
docs/design/sync-bypass.md
Outdated
| @@ -0,0 +1,228 @@ | |||
| # Design: Fully Synchronous Read/Write Bypass | |||
|
performance impact ranges from "good" to "amazing" so I think we want to learn from this PR. IMO this is NOT a merge candidate but rather should function as a proof-of-concept for what we can get if we rethink our current codec API. Some key points:
|
|
the current performance improvements are without any parallelism. I'm adding that now. |
|
the latest commit adds thread-based parallelism to the synchronous codec pipeline. we compute an estimated compute cost based on the chunk size, codecs, and operation (encode / code), and use that estimate to choose a parellelism strategy, ranging from no threads to full use of a thread pool. |
|
marking this as not a draft, because I think we should actually consider merging it. |
…ospection more efficient
…into perf/faster-codecs
mkitti
left a comment
There was a problem hiding this comment.
Could we adjust work estimates based on codec parameters?
| _MIN_CHUNK_NBYTES_FOR_POOL = 100_000 # 100 KB | ||
|
|
||
|
|
||
| def _choose_workers(n_chunks: int, chunk_nbytes: int, codecs: Iterable[Codec]) -> int: |
There was a problem hiding this comment.
Can this be def _use_thread_pool(...)->bool instead?
|
|
||
| def _get_pool(max_workers: int) -> ThreadPoolExecutor: | ||
| """Get a thread pool with at most *max_workers* threads.""" | ||
| def _get_pool() -> ThreadPoolExecutor: |
There was a problem hiding this comment.
hard to see why this had to change but... i"m not opposed to it.
|
The changes here improve performance a lot, but I think we can do even better with a more comprehensive set of changes. I had claude cook up a planning document based on zarrs and tensorstore here: https://hackmd.io/A933wEUwQjOx8rJmmWo13A. Please review that document. I will use this plan to guide the next round of performance improvements. Not sure if they will be in this PR or a subsequent one. |
| assert get_pipeline_class().__name__ != "" | ||
|
|
||
| config.set({"codec_pipeline.name": "zarr.core.codec_pipeline.BatchedCodecPipeline"}) | ||
| config.set({"codec_pipeline.path": "zarr.core.codec_pipeline.BatchedCodecPipeline"}) |
| # _open() from a sync context, so we replicate its logic here. | ||
| # ------------------------------------------------------------------- | ||
|
|
||
| def get_sync( |
There was a problem hiding this comment.
are we able to share sync/async code paths at all?
|
|
||
| # Minimum chunk size (in bytes) to consider using the thread pool. | ||
| # Below this, per-chunk codec work is too small to offset dispatch overhead. | ||
| _MIN_CHUNK_NBYTES_FOR_POOL = 100_000 # 100 KB |
There was a problem hiding this comment.
let's make this a config, so its easy to experiment with
| if self._all_sync: | ||
| # Streaming per-chunk pipeline: each chunk flows through | ||
| # read_existing → decode → merge → encode → write as a single | ||
| # task. Running N tasks concurrently overlaps IO with compute. | ||
| async def _write_chunk( | ||
| byte_setter: ByteSetter, |
There was a problem hiding this comment.
why is there an async func under self._all_sync? Seems like a naming issue that is very confusing to me right now
There was a problem hiding this comment.
we send this function into concurrent_map, which is why it needs to be async
This is a work in progress with all the heavy lifting done by claude. The goal is to improve the performance of our codecs by avoiding overhead in
to_threadand other async machinery. At the moment we have deadlocks in some of the array tests, but I am opening this now as a draft to see if the benchmarks show anything promising.